篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。
从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
// 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建 三大组件之 WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
// 启动 三大组件之 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动
WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同
接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化
1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory
@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());
2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE
RestEndpointFactory 创建 DispatcherRestEndpoint
/** &#64;link RestEndpointFactory which creates a &#64;link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway>
INSTANCE;
&#64;Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler)
throws Exception
final RestHandlerConfiguration restHandlerConfiguration &#61;
RestHandlerConfiguration.fromConfiguration(configuration);
// 创建 DispatcherRestEndpoint
return new DispatcherRestEndpoint(
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
resourceManagerGatewayRetriever,
transientBlobService,
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);
创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint
/** REST endpoint for the &#64;link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway>
//......
实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start
// 1、首先创建Router&#xff0c;来解析Client的请求并寻找对应的Handler
final Router router &#61; new Router();
// 2、 注册了一堆Handler
// 2.1、初始化 handlers
final CompletableFuture<String> restAddressFuture &#61; new CompletableFuture<>();
handlers &#61; initializeHandlers(restAddressFuture);
// 2.2、将这些Handler进行排序&#xff0c;这里的排序是为了确认URL和Handler一对一的关系
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
// 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
// 2.4、 注册 handlers
handlers.forEach(handler -> registerHandler(router, handler, log));
// 3.1、 ChannelInitializer 初始化
ChannelInitializer<SocketChannel> initializer &#61;
new ChannelInitializer<SocketChannel>()
&#64;Override
protected void initChannel(SocketChannel ch) throws ConfigurationException
RouterHandler handler &#61; new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled())
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories)
Optional<ChannelHandler> channelHandler &#61;
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent())
ch.pipeline().addLast(channelHandler.get());
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
;
NioEventLoopGroup bossGroup &#61;
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup &#61;
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap &#61; new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(initializer);
// 3.3、 Binding rest endpoint
// 3.3.1、获取可用端口范围
Iterator<Integer> portsIterator;
try
portsIterator &#61; NetUtils.getPortRangeFromString(restBindPortRange);
catch (IllegalConfigurationException e)
throw e;
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " &#43; restBindPortRange);
// 3.3.2、处理端口冲突 将逐一尝试端口是否可用
int chosenPort &#61; 0;
while (portsIterator.hasNext())
try
chosenPort &#61; portsIterator.next();
final ChannelFuture channel;
// 绑定address,port 获取 channel
if (restBindAddress &#61;&#61; null)
channel &#61; bootstrap.bind(chosenPort);
else
channel &#61; bootstrap.bind(restBindAddress, chosenPort);
serverChannel &#61; channel.syncUninterruptibly().channel();
break;
catch (final Exception e)
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException))
throw e;
if (serverChannel &#61;&#61; null)
throw new BindException(
"Could not start rest endpoint on any port in port range "
&#43; restBindPortRange);
log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress &#61; (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress())
advertisedAddress &#61; this.restAddress;
else
advertisedAddress &#61; bindAddress.getAddress().getHostAddress();
port &#61; bindAddress.getPort();
log.info("Rest endpoint listening at :", advertisedAddress, port);
restBaseUrl &#61; new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
state &#61; State.RUNNING;
/**
* Hook to start sub class specific services.
*
* &#64;throws Exception if an error occurred
*/
protected abstract void startInternal() throws Exception;
我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal
&#64;Override
public void startInternal() throws Exception
// 1、 节点选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI)
log.info("Web frontend listening at .", getRestBaseUrl());
HighAvailabilityServices 初始化, 根据 high-availability
的类型创建不同的 HighAvailabilityServices
leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的。
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
synchronized (lock)
checkNotShutdown();
return new StandaloneLeaderElectionService();
节点选举&#xff0c; 直接将 contender 设置为领导者&#xff0c; 此处的 contender 就是 WebMonitorEndpoint
&#64;Override
public void start(LeaderContender newContender) throws Exception
if (contender !&#61; null)
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");
contender &#61; Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService
&#64;Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
// 由子类实现 创建 选举leader服务
return createLeaderElectionService(getLeaderPathForRestServer());
子类实现
//org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
&#64;Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
// 创建 DefaultLeaderElectionService
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
DefaultLeaderElectionService 启动节点选举&#xff0c; 此处传入的 contender 就是 WebMonitorEndpoint
Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
&#64;Override
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender &#61;&#61; null, "Contender was already set.");
synchronized (lock)
running &#61; true;
/*
在WebMonitorEndpoint中调用时&#xff0c;此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/
leaderContender &#61; contender;
// 针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver
leaderElectionDriver &#61;
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
Flink的选举使用的是Curator框架&#xff0c;节点的选举针对每一个参选对象&#xff0c;会创建一个选举驱动leaderElectionDriver&#xff0c;在完成选举之后&#xff0c;会回调两个方法&#xff0c;如果选举成功会回调isLeader方法&#xff0c;如果竞选失败则回调notLeader方法。
ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver&#xff0c; LeaderElectionDriver负责执行领导选举和存储
领导信息。
// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
&#64;Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // DefaultLeaderElectionService对象
FatalErrorHandler fatalErrorHandler, // new LeaderElectionFatalErrorHandler()
String leaderContenderDescription)
throws Exception
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
checkNotNull(path);
this.client &#61; checkNotNull(client);
this.connectionInformationPath &#61; ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler &#61; checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler &#61; checkNotNull(fatalErrorHandler);
this.leaderContenderDescription &#61; checkNotNull(leaderContenderDescription);
leaderLatchPath &#61; ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch &#61; new LeaderLatch(client, leaderLatchPath);
this.cache &#61;
ZooKeeperUtils.createTreeCache(
client